/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.mongodb;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



import com.google.auto.value.AutoValue;

import com.mongodb.DB;

import com.mongodb.DBCursor;

import com.mongodb.DBObject;

import com.mongodb.Mongo;

import com.mongodb.MongoURI;

import com.mongodb.gridfs.GridFS;

import com.mongodb.gridfs.GridFSDBFile;

import com.mongodb.gridfs.GridFSInputFile;

import com.mongodb.util.JSON;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.OutputStream;

import java.io.Serializable;

import java.nio.charset.StandardCharsets;

import java.util.ArrayList;

import java.util.Iterator;

import java.util.List;

import java.util.NoSuchElementException;

import javax.annotation.Nullable;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.SerializableCoder;

import com.bff.gaia.unified.sdk.coders.StringUtf8Coder;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PDone;

import org.bson.types.ObjectId;

import org.joda.time.Duration;

import org.joda.time.Instant;



/**

 * IO to read and write data on MongoDB GridFS.

 *

 * <h3>Reading from MongoDB via GridFS</h3>

 *

 * <p>MongoDbGridFSIO source returns a bounded collection of Objects as {@code PCollection<T>}.

 *

 * <p>To configure the MongoDB GridFS source, you can provide the connection URI, the database name

 * and the bucket name. If unspecified, the default values from the GridFS driver are used.

 *

 * <p>The following example illustrates various options for configuring the source:

 *

 * <pre>{@code

 * pipeline.apply(MongoDbGridFSIO.<String>read()

 *   .withUri("mongodb://localhost:27017")

 *   .withDatabase("my-database")

 *   .withBucket("my-bucket"))

 * }</pre>

 *

 * <p>The source also accepts an optional configuration: {@code withQueryFilter()} allows you to

 * define a JSON filter to get subset of files in the database.

 *

 * <p>There is also an optional {@code Parser} (and associated {@code Coder}) that can be specified

 * that can be used to parse the InputStream into objects usable with Unified. By default,

 * MongoDbGridFSIO will parse into Strings, splitting on line breaks and using the uploadDate of the

 * file as the timestamp. When using a parser that outputs with custom timestamps, you may also need

 * to specify the allowedTimestampSkew option.

 *

 * <h3>Writing to MongoDB via GridFS</h3>

 *

 * <p>MongoDBGridFS supports writing of data to a file in a MongoDB GridFS collection.

 *

 * <p>To configure a MongoDB GridFS sink, you can provide the connection URI, the database name and

 * the bucket name. You must also provide the filename to write to. Another optional parameter is

 * the GridFS file chunkSize.

 *

 * <p>For instance:

 *

 * <pre>{@code

 * pipeline

 *   .apply(...)

 *   .apply(MongoDbGridFSIO.write()

 *     .withUri("mongodb://localhost:27017")

 *     .withDatabase("my-database")

 *     .withBucket("my-bucket")

 *     .withChunkSize(256000L)

 *     .withFilename("my-output.txt"))

 *

 * }</pre>

 *

 * <p>There is also an optional argument to the {@code create()} method to specify a writer that is

 * used to write the data to the OutputStream. By default, it writes UTF-8 strings to the file

 * separated with line feeds.

 */

@Experimental(Experimental.Kind.SOURCE_SINK)

public class MongoDbGridFSIO {



  /** Callback for the parser to use to submit data. */

  public interface ParserCallback<T> extends Serializable {

    /** Output the object. The default timestamp will be the GridFSDBFile creation timestamp. */

    void output(T output);



    /** Output the object using the specified timestamp. */

    void output(T output, Instant timestamp);

  }



  /** Interface for the parser that is used to parse the GridFSDBFile into the appropriate types. */

  public interface Parser<T> extends Serializable {

    void parse(GridFSDBFile input, ParserCallback<T> callback) throws IOException;

  }



  /**

   * For the default {@code Read<String>} case, this is the parser that is used to split the input

   * file into Strings. It uses the timestamp of the file for the event timestamp.

   */

  private static final Parser<String> TEXT_PARSER =

      (input, callback) -> {

        final Instant time = new Instant(input.getUploadDate().getTime());

        try (BufferedReader reader =

            new BufferedReader(

                new InputStreamReader(input.getInputStream(), StandardCharsets.UTF_8))) {

          for (String line = reader.readLine(); line != null; line = reader.readLine()) {

            callback.output(line, time);

          }

        }

      };



  /** Read data from GridFS. Default behavior with String. */

  public static Read<String> read() {

    return new AutoValue_MongoDbGridFSIO_Read.Builder<String>()

        .setParser(TEXT_PARSER)

        .setCoder(StringUtf8Coder.of())

        .setConnectionConfiguration(ConnectionConfiguration.create())

        .setSkew(Duration.ZERO)

        .build();

  }



  /** Write data to GridFS. Default behavior with String. */

  public static Write<String> write() {

    return new AutoValue_MongoDbGridFSIO_Write.Builder<String>()

        .setConnectionConfiguration(ConnectionConfiguration.create())

        .setWriteFn(

            (output, outStream) -> {

              outStream.write(output.getBytes(StandardCharsets.UTF_8));

              outStream.write('\n');

            })

        .build();

  }



  public static <T> Write<T> write(WriteFn<T> fn) {

    return new AutoValue_MongoDbGridFSIO_Write.Builder<T>()

        .setWriteFn(fn)

        .setConnectionConfiguration(ConnectionConfiguration.create())

        .build();

  }



  /** Encapsulate the MongoDB GridFS connection logic. */

  @AutoValue

  public abstract static class ConnectionConfiguration implements Serializable {

    @Nullable

    abstract String uri();



    @Nullable

    abstract String database();



    @Nullable

    abstract String bucket();



    static ConnectionConfiguration create() {

      return new AutoValue_MongoDbGridFSIO_ConnectionConfiguration(null, null, null);

    }



    static ConnectionConfiguration create(String uri, String database, String bucket) {

      return new AutoValue_MongoDbGridFSIO_ConnectionConfiguration(uri, database, bucket);

    }



    Mongo setupMongo() {

      return uri() == null ? new Mongo() : new Mongo(new MongoURI(uri()));

    }



    GridFS setupGridFS(Mongo mongo) {

      DB db = database() == null ? mongo.getDB("gridfs") : mongo.getDB(database());

      return bucket() == null ? new GridFS(db) : new GridFS(db, bucket());

    }

  }



  /** A {@link PTransform} to read data from MongoDB GridFS. */

  @AutoValue

  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {



    abstract ConnectionConfiguration connectionConfiguration();



    @Nullable

    abstract Parser<T> parser();



    @Nullable

    abstract Coder<T> coder();



    @Nullable

    abstract Duration skew();



    @Nullable

    abstract String filter();



    abstract Builder<T> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setConnectionConfiguration(ConnectionConfiguration connection);



      abstract Builder<T> setParser(Parser<T> parser);



      abstract Builder<T> setCoder(Coder<T> coder);



      abstract Builder<T> setSkew(Duration skew);



      abstract Builder<T> setFilter(String filter);



      abstract Read<T> build();

    }



    public Read<T> withUri(String uri) {

      checkNotNull(uri);

      ConnectionConfiguration config =

          ConnectionConfiguration.create(

              uri, connectionConfiguration().database(), connectionConfiguration().bucket());

      return toBuilder().setConnectionConfiguration(config).build();

    }



    public Read<T> withDatabase(String database) {

      checkNotNull(database);

      ConnectionConfiguration config =

          ConnectionConfiguration.create(

              connectionConfiguration().uri(), database, connectionConfiguration().bucket());

      return toBuilder().setConnectionConfiguration(config).build();

    }



    public Read<T> withBucket(String bucket) {

      checkNotNull(bucket);

      ConnectionConfiguration config =

          ConnectionConfiguration.create(

              connectionConfiguration().uri(), connectionConfiguration().database(), bucket);

      return toBuilder().setConnectionConfiguration(config).build();

    }



    public <X> Read<X> withParser(Parser<X> parser) {

      checkNotNull(parser);

      Builder<X> builder = (Builder<X>) toBuilder();

      return builder.setParser(parser).setCoder(null).build();

    }



    public Read<T> withCoder(Coder<T> coder) {

      checkNotNull(coder);

      return toBuilder().setCoder(coder).build();

    }



    public Read<T> withSkew(Duration skew) {

      return toBuilder().setSkew(skew == null ? Duration.ZERO : skew).build();

    }



    public Read<T> withFilter(String filter) {

      return toBuilder().setFilter(filter).build();

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder.addIfNotNull(DisplayData.item("uri", connectionConfiguration().uri()));

      builder.addIfNotNull(DisplayData.item("database", connectionConfiguration().database()));

      builder.addIfNotNull(DisplayData.item("bucket", connectionConfiguration().bucket()));

      builder.addIfNotNull(DisplayData.item("parser", parser().getClass().getName()));

      builder.addIfNotNull(DisplayData.item("coder", coder().getClass().getName()));

      builder.addIfNotNull(DisplayData.item("skew", skew()));

      builder.addIfNotNull(DisplayData.item("filter", filter()));

    }



    @Override

    public PCollection<T> expand(PBegin input) {

      final BoundedGridFSSource source = new BoundedGridFSSource(this, null);

      com.bff.gaia.unified.sdk.io.Read.Bounded<ObjectId> objectIds =

          com.bff.gaia.unified.sdk.io.Read.from(source);

      PCollection<T> output =

          input

              .getPipeline()

              .apply(objectIds)

              .apply(

                  ParDo.of(

                      new DoFn<ObjectId, T>() {

                        Mongo mongo;

                        GridFS gridfs;



                        @Setup

                        public void setup() {

                          mongo = source.spec.connectionConfiguration().setupMongo();

                          gridfs = source.spec.connectionConfiguration().setupGridFS(mongo);

                        }



                        @Teardown

                        public void teardown() {

                          mongo.close();

                        }



                        @ProcessElement

                        public void processElement(final ProcessContext c) throws IOException {

                          ObjectId oid = c.element();

                          GridFSDBFile file = gridfs.find(oid);

                          parser()

                              .parse(

                                  file,

                                  new ParserCallback<T>() {

                                    @Override

                                    public void output(T output, Instant timestamp) {

                                      checkNotNull(timestamp);

                                      c.outputWithTimestamp(output, timestamp);

                                    }



                                    @Override

                                    public void output(T output) {

                                      c.output(output);

                                    }

                                  });

                        }



                        @Override

                        public Duration getAllowedTimestampSkew() {

                          return skew();

                        }

                      }));

      if (coder() != null) {

        output.setCoder(coder());

      }

      return output;

    }



    /** A {@link BoundedSource} for MongoDB GridFS. */

    protected static class BoundedGridFSSource extends BoundedSource<ObjectId> {



      private Read<?> spec;



      @Nullable

	  private List<ObjectId> objectIds;



      BoundedGridFSSource(Read<?> spec, List<ObjectId> objectIds) {

        this.spec = spec;

        this.objectIds = objectIds;

      }



      private DBCursor createCursor(GridFS gridfs) {

        if (spec.filter() != null) {

          DBObject query = (DBObject) JSON.parse(spec.filter());

          return gridfs.getFileList(query);

        }

        return gridfs.getFileList();

      }



      @Override

      public List<? extends BoundedSource<ObjectId>> split(

          long desiredBundleSizeBytes, PipelineOptions options) throws Exception {

        Mongo mongo = spec.connectionConfiguration().setupMongo();

        try {

          GridFS gridfs = spec.connectionConfiguration().setupGridFS(mongo);

          DBCursor cursor = createCursor(gridfs);

          long size = 0;

          List<BoundedGridFSSource> list = new ArrayList<>();

          List<ObjectId> objects = new ArrayList<>();

          while (cursor.hasNext()) {

            GridFSDBFile file = (GridFSDBFile) cursor.next();

            long len = file.getLength();

            if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) {

              list.add(new BoundedGridFSSource(spec, objects));

              size = 0;

              objects = new ArrayList<>();

            }

            objects.add((ObjectId) file.getId());

            size += len;

          }

          if (!objects.isEmpty() || list.isEmpty()) {

            list.add(new BoundedGridFSSource(spec, objects));

          }

          return list;

        } finally {

          mongo.close();

        }

      }



      @Override

      public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {

        Mongo mongo = spec.connectionConfiguration().setupMongo();

        try {

          GridFS gridfs = spec.connectionConfiguration().setupGridFS(mongo);

          DBCursor cursor = createCursor(gridfs);

          long size = 0;

          while (cursor.hasNext()) {

            GridFSDBFile file = (GridFSDBFile) cursor.next();

            size += file.getLength();

          }

          return size;

        } finally {

          mongo.close();

        }

      }



      @Override

      public BoundedSource.BoundedReader<ObjectId> createReader(PipelineOptions options)

          throws IOException {

        return new GridFSReader(this, objectIds);

      }



      @Override

      public void populateDisplayData(DisplayData.Builder builder) {

        spec.populateDisplayData(builder);

      }



      @Override

      public Coder<ObjectId> getOutputCoder() {

        return SerializableCoder.of(ObjectId.class);

      }



      static class GridFSReader extends BoundedSource.BoundedReader<ObjectId> {

        final BoundedGridFSSource source;



        /* When split into bundles, this records the ObjectId's of the files for

         * this bundle.  Otherwise, this is null.  When null, a DBCursor of the

         * files is used directly to avoid having the ObjectId's queried and

         * loaded ahead of time saving time and memory.

         */

        @Nullable

		final List<ObjectId> objects;



        Mongo mongo;

        DBCursor cursor;

        Iterator<ObjectId> iterator;

        ObjectId current;



        GridFSReader(BoundedGridFSSource source, List<ObjectId> objects) {

          this.source = source;

          this.objects = objects;

        }



        @Override

        public BoundedSource<ObjectId> getCurrentSource() {

          return source;

        }



        @Override

        public boolean start() throws IOException {

          if (objects == null) {

            mongo = source.spec.connectionConfiguration().setupMongo();

            GridFS gridfs = source.spec.connectionConfiguration().setupGridFS(mongo);

            cursor = source.createCursor(gridfs);

          } else {

            iterator = objects.iterator();

          }

          return advance();

        }



        @Override

        public boolean advance() throws IOException {

          if (iterator != null && iterator.hasNext()) {

            current = iterator.next();

            return true;

          } else if (cursor != null && cursor.hasNext()) {

            GridFSDBFile file = (GridFSDBFile) cursor.next();

            current = (ObjectId) file.getId();

            return true;

          }

          current = null;

          return false;

        }



        @Override

        public ObjectId getCurrent() throws NoSuchElementException {

          if (current == null) {

            throw new NoSuchElementException();

          }

          return current;

        }



        @Override

        public Instant getCurrentTimestamp() throws NoSuchElementException {

          if (current == null) {

            throw new NoSuchElementException();

          }

          long time = current.getTimestamp();

          time *= 1000L;

          return new Instant(time);

        }



        @Override

        public void close() throws IOException {

          if (mongo != null) {

            mongo.close();

          }

        }

      }

    }

  }



  /** Function that is called to write the data to the give GridFS OutputStream. */

  public interface WriteFn<T> extends Serializable {

    /**

     * Output the object to the given OutputStream.

     *

     * @param output The data to output

     * @param outStream The OutputStream

     */

    void write(T output, OutputStream outStream) throws IOException;

  }



  /** A {@link PTransform} to write data to MongoDB GridFS. */

  @AutoValue

  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {

    abstract ConnectionConfiguration connectionConfiguration();



    @Nullable

    abstract Long chunkSize();



    abstract WriteFn<T> writeFn();



    @Nullable

    abstract String filename();



    abstract Builder<T> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setConnectionConfiguration(ConnectionConfiguration connection);



      abstract Builder<T> setFilename(String filename);



      abstract Builder<T> setChunkSize(Long chunkSize);



      abstract Builder<T> setWriteFn(WriteFn<T> fn);



      abstract Write<T> build();

    }



    public Write<T> withUri(String uri) {

      checkNotNull(uri);

      ConnectionConfiguration config =

          ConnectionConfiguration.create(

              uri, connectionConfiguration().database(), connectionConfiguration().bucket());

      return toBuilder().setConnectionConfiguration(config).build();

    }



    public Write<T> withDatabase(String database) {

      checkNotNull(database);

      ConnectionConfiguration config =

          ConnectionConfiguration.create(

              connectionConfiguration().uri(), database, connectionConfiguration().bucket());

      return toBuilder().setConnectionConfiguration(config).build();

    }



    public Write<T> withBucket(String bucket) {

      checkNotNull(bucket);

      ConnectionConfiguration config =

          ConnectionConfiguration.create(

              connectionConfiguration().uri(), connectionConfiguration().database(), bucket);

      return toBuilder().setConnectionConfiguration(config).build();

    }



    public Write<T> withFilename(String filename) {

      checkNotNull(filename);

      return toBuilder().setFilename(filename).build();

    }



    public Write<T> withChunkSize(Long chunkSize) {

      checkNotNull(chunkSize);

      checkArgument(chunkSize > 1, "Chunk Size must be greater than 1", chunkSize);

      return toBuilder().setChunkSize(chunkSize).build();

    }



    public void validate(T input) {

      checkNotNull(filename(), "filename");

      checkNotNull(writeFn(), "writeFn");

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder.addIfNotNull(DisplayData.item("uri", connectionConfiguration().uri()));

      builder.addIfNotNull(DisplayData.item("database", connectionConfiguration().database()));

      builder.addIfNotNull(DisplayData.item("bucket", connectionConfiguration().bucket()));

      builder.addIfNotNull(DisplayData.item("chunkSize", chunkSize()));

      builder.addIfNotNull(DisplayData.item("filename", filename()));

    }



    @Override

    public PDone expand(PCollection<T> input) {

      input.apply(ParDo.of(new GridFsWriteFn<>(this)));

      return PDone.in(input.getPipeline());

    }

  }



  private static class GridFsWriteFn<T> extends DoFn<T, Void> {



    private final Write<T> spec;



    private transient Mongo mongo;

    private transient GridFS gridfs;



    private transient GridFSInputFile gridFsFile;

    private transient OutputStream outputStream;



    public GridFsWriteFn(Write<T> spec) {

      this.spec = spec;

    }



    @Setup

    public void setup() throws Exception {

      mongo = spec.connectionConfiguration().setupMongo();

      gridfs = spec.connectionConfiguration().setupGridFS(mongo);

    }



    @StartBundle

    public void startBundle() {

      gridFsFile = gridfs.createFile(spec.filename());

      if (spec.chunkSize() != null) {

        gridFsFile.setChunkSize(spec.chunkSize());

      }

      outputStream = gridFsFile.getOutputStream();

    }



    @ProcessElement

    public void processElement(ProcessContext context) throws Exception {

      T record = context.element();

      spec.writeFn().write(record, outputStream);

    }



    @FinishBundle

    public void finishBundle() throws Exception {

      if (gridFsFile != null) {

        outputStream.flush();

        outputStream.close();

        outputStream = null;

        gridFsFile = null;

      }

    }



    @Teardown

    public void teardown() throws Exception {

      try {

        if (gridFsFile != null) {

          outputStream.flush();

          outputStream.close();

          outputStream = null;

          gridFsFile = null;

        }

      } finally {

        if (mongo != null) {

          mongo.close();

          mongo = null;

          gridfs = null;

        }

      }

    }

  }

}